Shared Variables

In a general spark computation code, we define some variables and then some methods to manipulate them. Running a job on a cluster, spark ensures that every node in the cluster receives its own copy of these methods and variables. If a node updates the variable value, other nodes, including the driver, remain unaware of the changes made. Most of the time this behavior is acceptable, but there might be instances when we want a node’s updated variable to be shared with other nodes in the cluster. To cater to such use cases, Apache Spark provides a concept of Shared Variables in Distributed Computing. Since it would be inefficient to support general, read-write shared variables across tasks, Spark provides some alternate options. It offers two types of shared variables in it’s distributed computing environment: Broadcast Variables and Accumulators.
 
Distributed Caching with Broadcast Variables
 
Broadcast variables is a read-only variable and is cached in a serialized format  on each spark worker nodes in memory rather than shipping a copy of it with tasks. Without broadcast variables these variables would be shipped to each executor for every transformation and action, and this can cause network overhead. 
  • It reduce the communication cost which speeds up the query performance while executing lookup or join operations.Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. 
  • Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted in this way is cached in serialized form and de-serialized before running each task.
  • Broadcast variables are mostly used when the tasks across multiple stages require the same data or when caching the data in the de-serialized form is required.
  • spark.sql.autoBroadcastJoinThreshold by default it is 10mb 
Broadcast Variable Use Case
Imagine that while doing a transformation we need to lookup a large table of zip codes/pin codes. Here, it is neither feasible to send the large lookup table every time to the executors, nor can we query the database every time. The solution should be to convert this lookup table to a broadcast variables and Spark will cache it in every executor for future reference.
 
A code without the use of broadcast variables look like this:
val pincode_map = Map(11 -> "Delhi", 12 -> "Haryana",14 -> "Punjab", 17 -> "Himachal Pradesh",18 -> "Jammu",19 -> "Kashmir")
val list_of_pincodes: List[Int] = List(14, 19, 18, 18, 12, 17, 11, 19, 17, 19, 19)

val RDDpincodes = sc.parallelize(list_of_pincodes)

val rdd_of_cities = RDDpincodes.map(pincode => pincode_map(pincode))
print(rdd_of_cities.collect().mkString("\n"))
 
 
Broadcast Variable

A code with the broadcast variables look like this:
 
val pincode_map = Map(11 -> "Delhi", 12 -> "Haryana",14 -> "Punjab", 17 -> "Himachal Pradesh",18 -> "Jammu",19 -> "Kashmir")

val list_of_pincodes = List(14, 19, 18, 18, 12, 17, 11, 19, 17, 19, 19)

val RDDpincodes = sc.parallelize(list_of_pincodes)

val pincode_map_broadcast = sc.broadcast(pincode_map)

// Access value from broadcast variable
val rdd_of_cities = RDDpincodes.map(pincode => pincode_map_broadcast.value.get(pincode).get)
print(rdd_of_cities.collect().mkString("\n"))
 
 
 
Accessing the Broadcast variables
 
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
 
The value Function in Broadcast variable
  • The value function check variable broadcasted or not
  • Once the variable is broadcasted  its give the access to the local copy of the variable defined with the executor and will execute as part of task execution.
  • By default the broadcast variable is cache in the machine
Working of Broadcast variables

When we run a spark job containing Broadcast variables, spark does the following processes:
  • It breaks the job into stages that have distributed shuffling. Spark executes the Actions within the stage.
  • Later Stages are also broken into tasks
  • Spark broadcasts the common data (reusable) needed by tasks within each stage.
  • The broadcasted data is cached in serialized format and deserialized before executing each task.
While running spark in local mode and with a small dataset, the run time improvements with Broadcast Variables are not visible. But as the code breaks down into multiple tasks, all accessing the same large dataset, the use case for broadcast variables come into play. 
 
Destroy Broadcast variables
  • broadcastVar.unpersist
  • broadcastVar.destroy
Broadcast variables Examples
 
Use case 1
val input = sc.parallelize(List(1, 2, 3))
val localVal = 2

val added = input.map( x => x + localVal)
added.foreach(println)

val multiplied = input.map( x => x * 2)
multiplied.foreach(println)

val broadcastVar = sc.broadcast(2)
val added = input.map(x => broadcastVar.value + x)
added.foreach(println)

Use case 2
val emp=sc.parallelize(Seq("Jhon",1),("Martin",2),("Nancy",3),("Bob",2),("Sony",4),("Smith",3))
val dp= Map(1->'a',2->'b',3->'c')

val dep=sc.broadcast(dp)
emp.map( x=> x._1+ ","+x._2+ "," + dep.value.get(x._2).get).collect().foreach(println)
 
Note:- The data broadcasted this way is cached in serialized form and deserialized before running each task. So, if the data being broadcasted is very very huge, serialization and deserialization become costly operations. So in such  cases you should avoid using broadcast variables. Once we broadcasted the value to the nodes, we shouldn’t make changes to its value to make sure each node have exact same copy of data.The modified value might be sent to another node later that would give unexpected results.
 

Accumulator 

Another set of Shared Variables in Distributed Computing is called Accumulators. While Broadcast Variables are read-only, Spark Accumulators are read-write values that implement shared variables that can be operated on (added to), from various tasks running as a part of the job.Accumulators can help with a number of potential use cases including for example counting the number of failed records across a complete cluster, the total number of records associated with a product ID, or the number of basket check-outs in a window.


Spark’s out of the box concept of Accumulators allow multiple workers to write to a shared variable, but does not allow them to read it. Only the Driver node can read the accumulator’s value using .value with the name of the accumulator.

To initialise a numeric accumulator with an initial value, use the following code: 

// Initialise an accumulator with name "Failed records accumulator"
val failed_records_acc = sc.longAccumulator("Failed records accumulator") 

Accumulators are “added” to through an associative and commutative operation only and thus, they can be efficiently supported in parallel. So, to increment the value of an accumulator, use the following code:

....
records.map{
//update the value of the accumulator
if(map.failed) failed_records_acc += 1
}
..... 

Spark not only support numeric accumulator, but also allow the programmers to create their own types by subclassing AccumulatorV2 abstract class and implementing its various methods.

 

/**
 * A custom accumulator for string concatenation
 */
class StringAccumulator(private var _value: String) extends AccumulatorV2[String, String] {
 
    def this() {
        this("")
    }
    //Accumulates the input to the current value
    override def add(newValue: String): Unit = {
        _value = value + " " + newValue.trim
    }
    //Merge another same type accumulator into the current one
    override def merge(other: AccumulatorV2[String, String]): Unit = {
        add(other.value)
    }
    //Resets the accumulator
    override def reset(): Unit = {
        _value = ""
    }
}

Limitation  of Accumulators

When using accumulators there are some caveats that we as programmers need to be aware of,

  • Spark evaluates Computations inside transformations lazily. As a result, unless some action happens, spark does not execute accumulators inside functions like map() or filter().
  • Spark guarantees to update accumulators inside actions only once. So, restarting a task and recomputing the lineage updates the accumulators only once.
  • Spark does not guarantee this for transformations. So, restarting a task and recomputing the lineage might update the accumulator value more than once.

A good rule of thumb to follow is to use accumulators only for data you would consider to be a side effect of your main data processing application.

No comments:

Post a Comment